#RxJava2
SimpleExample

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Observable.just("Cricket", "Football")..subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {

@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}

@Override
public void onNext(String value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext : value : " + value);
}

@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}

@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
});

逐行打印出数据**

map操作**

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/*
* Here we are getting ApiUser Object from api server
* then we are converting it into User Object because
* may be our database support User Not ApiUser Object
* Here we are using Map Operator to do that
*/
/*
* Here we are getting ApiUser Object from api server
* then we are converting it into User Object because
* may be our database support User Not ApiUser Object
* Here we are using Map Operator to do that
*/
private void doSomeWork() {
getObservable()
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.map(new Function<List<ApiUser>, List<User>>() {

@Override
public List<User> apply(List<ApiUser> apiUsers) throws Exception {
return Utils.convertApiUserListToUserList(apiUsers);
}
})
.subscribe(getObserver());
}

private Observable<List<ApiUser>> getObservable() {
return Observable.create(new ObservableOnSubscribe<List<ApiUser>>() {
@Override
public void subscribe(ObservableEmitter<List<ApiUser>> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(Utils.getApiUserList());
e.onComplete();
}
}
});
}

private Observer<List<User>> getObserver() {
return new Observer<List<User>>() {

@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}

@Override
public void onNext(List<User> userList) {
textView.append(" onNext");
textView.append(AppConstant.LINE_SEPARATOR);
for (User user : userList) {
textView.append(" firstName : " + user.firstName);
textView.append(AppConstant.LINE_SEPARATOR);
}
Log.d(TAG, " onNext : " + userList.size());
}

@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}

@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}

zip**

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/*
* Here we are getting two user list
* One, the list of cricket fans
* Another one, the list of football fans
* Then we are finding the list of users who loves both
*/
private void doSomeWork() {
Observable.zip(getCricketFansObservable(), getFootballFansObservable(),
new BiFunction<List<User>, List<User>, List<User>>() {
@Override
public List<User> apply(List<User> cricketFans, List<User> footballFans) throws Exception {
return Utils.filterUserWhoLovesBoth(cricketFans, footballFans);
}
})
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
}

private Observable<List<User>> getCricketFansObservable() {
return Observable.create(new ObservableOnSubscribe<List<User>>() {
@Override
public void subscribe(ObservableEmitter<List<User>> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(Utils.getUserListWhoLovesCricket());
e.onComplete();
}
}
});
}

private Observable<List<User>> getFootballFansObservable() {
return Observable.create(new ObservableOnSubscribe<List<User>>() {
@Override
public void subscribe(ObservableEmitter<List<User>> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(Utils.getUserListWhoLovesFootball());
e.onComplete();
}
}
});
}

private Observer<List<User>> getObserver() {
return new Observer<List<User>>() {

@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}

@Override
public void onNext(List<User> userList) {
textView.append(" onNext");
textView.append(AppConstant.LINE_SEPARATOR);
for (User user : userList) {
textView.append(" firstName : " + user.firstName);
textView.append(AppConstant.LINE_SEPARATOR);
}
Log.d(TAG, " onNext : " + userList.size());
}

@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}

@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}

Disposable**

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Override
protected void onDestroy() {
super.onDestroy();
disposables.clear(); // do not send event after activity has been destroyed
}

/*
* Example to understand how to use disposables.
* disposables is cleared in onDestroy of this activity.
*/
void doSomeWork() {
disposables.add(sampleObservable()
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<String>() {
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}

@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}

@Override
public void onNext(String value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext value : " + value);
}
}));
}

static Observable<String> sampleObservable() {
return Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() throws Exception {
// Do some long running operation
SystemClock.sleep(2000);
return Observable.just("one", "two", "three", "four", "five");
}
});
}

take**

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/* Using take operator, it only emits
* required number of values. here only 3 out of 5
*/
private void doSomeWork() {
getObservable()
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.take(3)
.subscribe(getObserver());
}

private Observable<Integer> getObservable() {
return Observable.just(1, 2, 3, 4, 5);
}

private Observer<Integer> getObserver() {
return new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}

@Override
public void onNext(Integer value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext value : " + value);
}

@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}

@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}

Timer**

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/*
* simple example using timer to do something after 2 second
*/
private void doSomeWork() {
getObservable()
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
}

private Observable<? extends Long> getObservable() {
return Observable.timer(2, TimeUnit.SECONDS);
}

private Observer<Long> getObserver() {
return new Observer<Long>() {

@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}

@Override
public void onNext(Long value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext : value : " + value);
}

@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}

@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}

Interval**

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Override
protected void onDestroy() {
super.onDestroy();
disposables.clear(); // clearing it : do not emit after destroy
}

/*
* simple example using interval to run task at an interval of 2 sec
* which start immediately
*/
private void doSomeWork() {
disposables.add(getObservable()
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(getObserver()));
}

private Observable<? extends Long> getObservable() {
return Observable.interval(0, 2, TimeUnit.SECONDS);
}

private DisposableObserver<Long> getObserver() {
return new DisposableObserver<Long>() {

@Override
public void onNext(Long value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext : value : " + value);
}

@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}

@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}

SingleObserver**

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/*
* simple example using SingleObserver
*/
private void doSomeWork() {
Single.just("Amit")
.subscribe(getSingleObserver());
}

private SingleObserver<String> getSingleObserver() {
return new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}

@Override
public void onSuccess(String value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext value : " + value);
}

@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
};
}

CompletableObserver**

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/*
* simple example using CompletableObserver
*/
private void doSomeWork() {
Completable completable = Completable.timer(1000, TimeUnit.MILLISECONDS);

completable
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getCompletableObserver());
}

private CompletableObserver getCompletableObserver() {
return new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}

@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}

@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
};
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/*
* simple example using Flowable
*/
private void doSomeWork() {

Flowable<Integer> observable = Flowable.just(1, 2, 3, 4);

observable.reduce(50, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) {
return t1 + t2;
}
}).subscribe(getObserver());

}

private SingleObserver<Integer> getObserver() {

return new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}

@Override
public void onSuccess(Integer value) {
textView.append(" onSuccess : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onSuccess : value : " + value);
}

@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
};
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/*
* simple example using reduce to add all the number
*/
private void doSomeWork() {
getObservable()
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) {
return t1 + t2;
}
})
.subscribe(getObserver());
}

private Observable<Integer> getObservable() {
return Observable.just(1, 2, 3, 4);
}

private MaybeObserver<Integer> getObserver() {
return new MaybeObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}

@Override
public void onSuccess(Integer value) {
textView.append(" onSuccess : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onSuccess : value : " + value);
}

@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}

@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/*
* simple example using buffer operator - bundles all emitted values into a list
*/
private void doSomeWork() {

Observable<List<String>> buffered = getObservable().buffer(3, 1);

// 3 means, it takes max of three from its start index and create list
// 1 means, it jumps one step every time
// so the it gives the following list
// 1 - one, two, three
// 2 - two, three, four
// 3 - three, four, five
// 4 - four, five
// 5 - five

buffered.subscribe(getObserver());
}

private Observable<String> getObservable() {
return Observable.just("one", "two", "three", "four", "five");
}

private Observer<List<String>> getObserver() {
return new Observer<List<String>>() {

@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}

@Override
public void onNext(List<String> stringList) {
textView.append(" onNext size : " + stringList.size());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext : size :" + stringList.size());
for (String value : stringList) {
textView.append(" value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " : value :" + value);
}

}

@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}

@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/*
* simple example by using filter operator to emit only even value
*
*/
private void doSomeWork() {
Observable.just(1, 2, 3, 4, 5, 6)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer % 2 == 0;
}
})
.subscribe(getObserver());
}


private Observer<Integer> getObserver() {
return new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}

@Override
public void onNext(Integer value) {
textView.append(" onNext : ");
textView.append(AppConstant.LINE_SEPARATOR);
textView.append(" value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext ");
Log.d(TAG, " value : " + value);
}

@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}

@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/* Using replay operator, replay ensure that all observers see the same sequence
* of emitted items, even if they subscribe after the Observable has begun emitting items
*/
private void doSomeWork() {

PublishSubject<Integer> source = PublishSubject.create();
ConnectableObservable<Integer> connectableObservable = source.replay(3); // bufferSize = 3 to retain 3 values to replay
connectableObservable.connect(); // connecting the connectableObservable

connectableObservable.subscribe(getFirstObserver());

source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();

/*
* it will emit 2, 3, 4 as (count = 3), retains the 3 values for replay
*/
connectableObservable.subscribe(getSecondObserver());

}


private Observer<Integer> getFirstObserver() {
return new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " First onSubscribe : " + d.isDisposed());
}

@Override
public void onNext(Integer value) {
textView.append(" First onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " First onNext value : " + value);
}

@Override
public void onError(Throwable e) {
textView.append(" First onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " First onError : " + e.getMessage());
}

@Override
public void onComplete() {
textView.append(" First onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " First onComplete");
}
};
}

private Observer<Integer> getSecondObserver() {
return new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
textView.append(" Second onSubscribe : isDisposed :" + d.isDisposed());
Log.d(TAG, " Second onSubscribe : " + d.isDisposed());
textView.append(AppConstant.LINE_SEPARATOR);
}

@Override
public void onNext(Integer value) {
textView.append(" Second onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " Second onNext value : " + value);
}

@Override
public void onError(Throwable e) {
textView.append(" Second onError : " + e.getMessage());
Log.d(TAG, " Second onError : " + e.getMessage());
}

@Override
public void onComplete() {
textView.append(" Second onComplete");
Log.d(TAG, " Second onComplete");
}
};
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/*
* Using concat operator to combine Observable : concat maintain
* the order of Observable.
* It will emit all the 7 values in order
* here - first "A1", "A2", "A3", "A4" and then "B1", "B2", "B3"
* first all from the first Observable and then
* all from the second Observable all in order
*/
private void doSomeWork() {
final String[] aStrings = {"A1", "A2", "A3", "A4"};
final String[] bStrings = {"B1", "B2", "B3"};

final Observable<String> aObservable = Observable.fromArray(aStrings);
final Observable<String> bObservable = Observable.fromArray(bStrings);

Observable.concat(aObservable, bObservable)
.subscribe(getObserver());
}


private Observer<String> getObserver() {
return new Observer<String>() {

@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}

@Override
public void onNext(String value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext : value : " + value);
}

@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}

@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}